In [1]:
import itertools
import json
import os
import random
from pathlib import Path
import cv2
import numpy as np
import torch
from shapely import Polygon
from torch.utils.data import DataLoader
from tqdm import tqdm
from datasets.SlideSeperatedImageDataset import SlideSeperatedImageDataset
from labelers.GroundTruthLabeler import GroundTruthLabeler
from models.resnet import Resnet18BinaryClassifier, Resnet101BinaryClassifier
from utils import divide, bbox_to_points, rgb_to_bgr, downscale_points, absolute_points_to_relative, \
filter_points_within_bbox, \
is_not_mostly_blank
from utils import show_cv2_image
In [2]:
slides_root_dir = "data/whole-slides/gut"
labels_root_dir = "data/labels"
candidates_dataset_dir = "output/candidates"
model_output_dir = "output/models"
PretrainedModelClass = Resnet101BinaryClassifier
pretrained_model_name = PretrainedModelClass.get_pretrained_model_name()
pretrained_output_size = PretrainedModelClass.pretrained_output_size
print(f"{pretrained_model_name}: {pretrained_output_size} features")
Resnet101: 2048 features
In [3]:
ground_truth_labeler = GroundTruthLabeler(f"{labels_root_dir}/slide-annotations/all.json",
f"{labels_root_dir}/patch-classifications.csv")
with open(f"{model_output_dir}/data-split.json", 'r') as f:
backup = json.load(f)
slides_df = ground_truth_labeler.positive_regions_summary
train_slides, test_slides = set(backup["train_slides"]), set(backup["test_slides"])
if train_slides.union(test_slides) != set(slides_df["slide_name"]) or len(train_slides.intersection(test_slides)) > 0:
raise ValueError("Invalid train test split")
train_slides = slides_df[slides_df['slide_name'].isin(train_slides)]
test_slides = slides_df[slides_df['slide_name'].isin(test_slides)]
model = PretrainedModelClass(model=torch.load(f"{model_output_dir}/{pretrained_model_name}.pickle"))
print("Test slides")
test_slides
Test slides
Out[3]:
| slide_name | n_gt_positive_regions | category | |
|---|---|---|---|
| 5 | 593437 | 94 | 3 |
| 7 | 593446 | 15 | 1 |
| 8 | 593436 | 170 | 4 |
| 12 | 593435 | 109 | 4 |
| 15 | 522021 | 3 | 1 |
| 16 | 593453 | 19 | 2 |
| 18 | 593434 | 27 | 2 |
In [4]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"Device: {device}")
model = model.to(device)
batch_size = 256
test_dataset = SlideSeperatedImageDataset(candidates_dataset_dir, set(test_slides["slide_name"]), with_index=True)
# test_dataset = reduce_dataset(test_dataset, discard_ratio=0.99)
test_loader = DataLoader(test_dataset,
batch_size=batch_size,
shuffle=False, )
print(f"Candidates: {len(test_dataset):,}")
Device: cuda:0 Candidates: 14,346
In [5]:
model.eval()
indexes = []
probs = []
with torch.no_grad():
for i, (x_test, y_test, index) in enumerate(tqdm(iter(test_loader), desc=f"Testing")):
x_test = x_test.to(device)
y_test = y_test.to(device)
test_logits = model.forward(x_test)
test_loss = model.loss_function(test_logits, y_test)
# test_preds = model.predict(test_logits)
indexes.append(index)
probs.append(test_logits.squeeze())
indexes = torch.cat(indexes).to("cpu")
probs = torch.cat(probs).to("cpu")
Testing: 100%|██████████| 57/57 [00:21<00:00, 2.67it/s]
In [6]:
threshold = 0.3
predictions = probs > threshold
predicted_positives = indexes[predictions == 1]
predicted_positive_bboxes_by_slide = {}
while not isinstance(test_dataset, SlideSeperatedImageDataset):
test_dataset = test_dataset.dataset
for item_index in predicted_positives:
file_path = test_dataset.get_item_file_path(item_index)
file_name = Path(file_path).stem
slide, x_min, y_min, width, height = file_name.split("_")
x_min, y_min, width, height = int(x_min), int(y_min), int(width), int(height)
if not slide in predicted_positive_bboxes_by_slide:
predicted_positive_bboxes_by_slide[slide] = []
predicted_positive_bboxes_by_slide[slide].append((x_min, y_min, width, height))
In [7]:
import json
with open("temp.json", "w") as f:
json.dump(predicted_positive_bboxes_by_slide, f)
In [8]:
def calculate_iou(poly, bbox):
intersection = poly.buffer(0).intersection(bbox).area
union = poly.buffer(0).union(bbox).area
return intersection / union if union > 0 else 0
def calculate_iogt(poly, bbox):
intersection = poly.buffer(0).intersection(bbox).area
return intersection / poly.area if poly.area > 0 else 0
def calculate_iopd(poly, bbox):
intersection = poly.buffer(0).intersection(bbox).area
return intersection / bbox.area if bbox.area > 0 else 0
def calculate_metrics(confusion_matrix):
tp, fp, fn = confusion_matrix["TP"], confusion_matrix["FP"], confusion_matrix["FN"]
precision = divide(tp, (tp + fp))
recall = divide(tp, (tp + fn))
f1 = divide(2 * precision * recall, (precision + recall))
return precision, recall, f1
def is_matched(gt, pred, i_threshold=0.5):
iou = calculate_iou(gt, pred)
igt = calculate_iogt(gt, pred)
ipd = calculate_iopd(gt, pred)
return igt > i_threshold #or iou > i_threshold #or ipd > i_threshold
def calculate_iou_confusion_matrix(ground_truth_polygons, predicted_bboxes, i_threshold=0.25):
gt_polys = [Polygon(pts).buffer(0) for pts in ground_truth_polygons]
pred_polys = [Polygon([(x, y), (x + w, y), (x + w, y + h), (x, y + h)]).buffer(0) for x, y, w, h in
predicted_bboxes]
matched_gt = set()
matched_pred = set()
for i, gt in enumerate(gt_polys):
for j, pred in enumerate(pred_polys):
if is_matched(gt, pred, i_threshold):
matched_gt.add(i)
matched_pred.add(j)
TP = len(matched_gt)
FP = len(pred_polys) - len(matched_pred)
FN = len(gt_polys) - len(matched_gt)
return {
"TP": TP,
"FP": FP,
"FN": FN
}
total_confusion_matrix = {
"TP": 0,
"FP": 0,
"FN": 0
}
for slide_name in test_slides.slide_name:
ground_truth_positive_regions = ground_truth_labeler.get_positive_regions(slide_name)
predicted_positive_bboxes = predicted_positive_bboxes_by_slide.get(slide_name, [])
confusion_matrix = calculate_iou_confusion_matrix(ground_truth_positive_regions, predicted_positive_bboxes)
tp, fp, fn = confusion_matrix["TP"], confusion_matrix["FP"], confusion_matrix["FN"]
precision, recall, f1 = calculate_metrics(confusion_matrix)
total_confusion_matrix["TP"] += tp
total_confusion_matrix["FP"] += fp
total_confusion_matrix["FN"] += fn
n_ground_truth_pos = len(ground_truth_positive_regions)
n_cv_candidate_pos = test_dataset.slide_to_dataset[slide_name].labels.sum().item()
print(
f"{slide_name}: {n_ground_truth_pos:03d} ground truth positives, {n_cv_candidate_pos:03d} positive candidate patches, precision: {precision:.6f}, recall: {recall:.6f}, f1: {f1:.6f}")
total_precision, total_recall, total_f1 = calculate_metrics(total_confusion_matrix)
print()
print(f"Overall: precision: {total_precision:.6f}, recall: {total_recall:.6f}, f1: {total_f1:.6f}")
593437: 094 ground truth positives, 065 positive candidate patches, precision: 0.477612, recall: 0.340426, f1: 0.397516 593446: 015 ground truth positives, 008 positive candidate patches, precision: 0.094595, recall: 0.466667, f1: 0.157303 593436: 170 ground truth positives, 116 positive candidate patches, precision: 0.679012, recall: 0.647059, f1: 0.662651 593435: 109 ground truth positives, 072 positive candidate patches, precision: 0.555556, recall: 0.504587, f1: 0.528846 522021: 003 ground truth positives, 002 positive candidate patches, precision: 0.011364, recall: 0.333333, f1: 0.021978 593453: 019 ground truth positives, 010 positive candidate patches, precision: 0.103448, recall: 0.157895, f1: 0.125000 593434: 027 ground truth positives, 014 positive candidate patches, precision: 0.269231, recall: 0.518519, f1: 0.354430 Overall: precision: 0.388792, recall: 0.508009, f1: 0.440476
In [9]:
if hasattr(os, 'add_dll_directory'):
# Windows
OPENSLIDE_PATH = os.path.join(os.path.abspath(os.getcwd()), "libs/openslide-bin-4.0.0.3-windows-x64/bin")
with os.add_dll_directory(OPENSLIDE_PATH):
import openslide
else:
import openslide
def draw_gt_and_pred(slide_name, cell, cell_bbox, thickness=2):
gt_in_cell = ground_truth_labeler.get_positive_regions(slide_name, cell_bbox)
for ground_truth_region in gt_in_cell:
ground_truth_points = absolute_points_to_relative(ground_truth_region, cell_bbox)
ground_truth_points = downscale_points(ground_truth_points, level_downsample)
cv2.polylines(cell, [np.array(ground_truth_points).reshape((-1, 1, 2))], isClosed=True,
color=rgb_to_bgr((255, 0, 0)), thickness=thickness)
predicted_positive_bboxes = predicted_positive_bboxes_by_slide.get(preview_slide_name, [])
for pred_bbox in predicted_positive_bboxes:
pred_points = bbox_to_points(pred_bbox)
is_correct = any([is_matched(Polygon(gt), Polygon(pred_points)) for gt in gt_in_cell])
pred_points = filter_points_within_bbox(pred_points, cell_bbox)
if len(pred_points) == 0:
continue
pred_points = absolute_points_to_relative(pred_points, cell_bbox)
pred_points = downscale_points(pred_points, level_downsample)
cv2.polylines(cell, [np.array(pred_points).reshape((-1, 1, 2))], isClosed=True,
color=rgb_to_bgr((0, 255, 0) if is_correct else (0, 0, 255)), thickness=thickness)
return cell
def add_legends(fig, ax):
x_center = 0.5 # Centered in figure coordinates (0 = left, 1 = right)
fig.text(x_center, 0.05, 'Ground truth positive', color='red', fontsize=12, ha='center')
fig.text(x_center, 0, 'Predicted False positive', color='blue', fontsize=12, ha='center')
fig.text(x_center, -0.05, 'Predicted True positive', color='green', fontsize=12, ha='center')
preview_slide_name = "593435" # random.choice(list(test_slides.slide_name))
print(preview_slide_name)
preview_level = 1
preview_cell_size = 4096
slide = openslide.OpenSlide(f"{slides_root_dir}/{preview_slide_name}.svs")
level_downsample = slide.level_downsamples[preview_level]
ds_cell_size = int(preview_cell_size / level_downsample)
full_slide_width, full_slide_height = slide.level_dimensions[0]
preview_bboxes = [(x, y, preview_cell_size, preview_cell_size) for x, y in
itertools.product(range(0, full_slide_width, preview_cell_size),
range(0, full_slide_height, preview_cell_size))]
titles = []
for i in range(len(preview_bboxes)):
x, y, _, _ = preview_bboxes[i]
preview_region = np.array(slide.read_region((x, y), preview_level, (ds_cell_size, ds_cell_size)).convert("RGBA"))
if not is_not_mostly_blank(preview_region):
continue
preview_region = draw_gt_and_pred(preview_slide_name, preview_region, preview_bboxes[i], thickness=4)
show_cv2_image(preview_region, cb=add_legends,
title=f"Slide {preview_slide_name}, ({x}, {y}, {preview_cell_size}, {preview_cell_size})", figsize=(10,10))
slide.close()
593435